package org.apache.seatunnel.connectors.seatunnel.gaussdbredis.sink;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.config.GaussDBRedisConfig;
import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.config.GaussDBRedisParameters;
import org.apache.seatunnel.connectors.seatunnel.gaussdbredis.exception.GaussDBRedisConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import java.io.IOException;

@AutoService(SeaTunnelSink.class)
public class GaussDBRedisSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
    private final GaussDBRedisParameters redisParameters = new GaussDBRedisParameters();
    private SeaTunnelRowType seaTunnelRowType;
    private Config pluginConfig;

    @Override
    public String getPluginName() {
        return "GaussDB-Redis";
    }

    @Override
    public void prepare(Config pluginConfig) throws PrepareFailException {
        this.pluginConfig = pluginConfig;
        CheckResult result =
                CheckConfigUtil.checkAllExists(
                        pluginConfig,
                        GaussDBRedisConfig.HOST.key(),
                        GaussDBRedisConfig.PORT.key(),
                        GaussDBRedisConfig.KEY.key(),
                        GaussDBRedisConfig.DATA_TYPE.key());
        if (!result.isSuccess()) {
            throw new GaussDBRedisConnectorException(
                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                    String.format(
                            "PluginName: %s, PluginType: %s, Message: %s",
                            getPluginName(), PluginType.SINK, result.getMsg()));
        }
        this.redisParameters.buildWithConfig(pluginConfig);
    }

    @Override
    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        this.seaTunnelRowType = seaTunnelRowType;
    }

    @Override
    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
        return seaTunnelRowType;
    }

    @Override
    public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
            throws IOException {
        return new GaussDBRedisSinkWriter(seaTunnelRowType, redisParameters);
    }
}
